package com.gitee.jastee.kafka.example;

import cn.hutool.core.date.DateUtil;
import cn.hutool.json.JSONObject;
import cn.hutool.json.JSONUtil;
import com.gitee.jastee.kafka.consumer.KafkaConsumerClient;
import com.gitee.jastee.kafka.producer.KafkaProducerClient;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.*;

import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;

/**
 * 发送数据、消费数据Demo
 *
 * @Author jast
 * @Date 2020 /4/19 下午2:52
 * @Version 1.0
 */
public class ProducerConsumerDemo {

    private static final String topic = "undoneLabel";
    private static final String group = "test-consumer";

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        //sendData(10);
        Thread.sleep(10000);
            consumerData();
//            Thread.sleep(10000);
    }

    /**
     * 消费数据
     */
    private  static void consumerData() throws InterruptedException {
        KafkaConsumerClient consumerClient = new KafkaConsumerClient();
        KafkaConsumer<String, String> consumer = consumerClient.createConsumer(topic, group, 10, false);
        for(int i = 0 ; i < 100000 ; i ++){
            ConsumerRecords<String, String> records = consumer.poll(10000);
            for(ConsumerRecord record:records){
                System.out.println(DateUtil.date());
                System.out.println("消费数据->"+record.value()+"\t"+record.partition()+"\t"+record.offset());
            }
            System.out.println("1111");
            Thread.sleep(5000);
        }
    }
    /**
     * 发送测试数据
     * @throws ExecutionException
     * @throws InterruptedException
     */
    private static void sendData(int count) throws ExecutionException, InterruptedException {
        KafkaProducerClient producerClient = new KafkaProducerClient();
        Producer<String, String> producer = producerClient.getProducer();
        JSONObject jo = new JSONObject();
        jo.set("key","lalala");
        jo.set("timestamp",System.currentTimeMillis());
        for(int i = 0;i<count;i++) {
            String data = jo.toJSONString(0);
            System.out.println(data);
            ProducerRecord<String, String> record = new ProducerRecord<String, String>(topic, data);
//            producer.send(record);
            Future<RecordMetadata> future = producer.send(record, new Callback() {
                @Override
                public void onCompletion(RecordMetadata metadata, Exception exception) {
                    //exception不等于空属于异常发送异常
                    if(exception!=null){
                        System.out.println("发送消息异常:" + JSONUtil.toJsonStr(metadata) + "," + exception);
                        exception.printStackTrace();
                    }
                }
            });
            //这里测试使用，线上环境
//            future.get();
        }

    }




}
